Fivetran REST API で同期を実行し Webhook で通知する #Fivetran
はじめに
FIvetran では REST API が提供されており、API を使用したコネクタの同期が可能です。
これにより Cron などでカスタムスケジュールでの同期ができるようになります。また、Webhookを作成し同期完了などのイベント通知も可能なので、こちらの手順について記事としました。
前提条件
以下の設定で Destination と コネクタは構成済みとしています。
- Destination:Snowflake
- Connector:Amazon S3
API キーの取得
はじめに、以下に記載の手順で API キーを取得します。
Getting Started with Fivetran REST API
Fivetran ダッシュボードで「ユーザー名 > API キー」をクリックします。
「Generate API key」をクリックしキーを生成します。
認証
Fivetran REST API では API キー認証を使用します。 API へのリクエストごとに、Authorization
ヘッダーにBasic {api_key}:{api_secret}
を指定します。
この際、{api_key}:{api_secret}
部分は base64 でエンコードされている必要があります。エンコードされたキーも上記で生成されます。
コネクタの手動同期
通常 Fivetran ではコネクタを作成すると同期頻度として指定の間隔で、データを同期できます。
独自のスケジュールや何らかのイベントをトリガーに同期を行いたい場合は、API を使用し同期をスケジュールすることで対応できます。
コネクタID の取得
はじめに同期の対象となるコネクタのIDを取得します。コネクタID はダッシュボード上から確認できます。
こちらは API でも取得可能です。
コネクタの変更
以下のエンドポイントで対象のコネクタのschedule_type
を変更します。
# 環境変数からAPIキーとAPIシークレットを取得 API_KEY=$FIVETRAN_API_KEY API_SECRET_KEY=$FIVETRAN_API_SECRET_KEY # APIキーとAPIシークレットを結合してBase64エンコード AUTH_HEADER=$(echo -n "$API_KEY:$API_SECRET_KEY" | base64) #コネクタIDを代入 CONNECTOR_ID="<コネクタID>" #スケジュールタイプをmanualに変更 curl -X PATCH \ -H "Content-Type: application/json" \ -H "Authorization: Basic $AUTH_HEADER" \ -d '{ "schedule_type": "manual", "run_setup_tests": false }' \ https://api.fivetran.com/v1/connectors/{$CONNECTOR_ID}
出力
{ "code": "Success", "message": "Connector has been updated", "data": { "id": "<コネクタID>", "group_id": "<グループID>", "service": "s3", "service_version": 1, "schema": "s3.test", "connected_by": "known_rheumatic", "created_at": "2024-04-30T08:36:40.693207Z", "succeeded_at": "2024-04-30T09:17:10.664Z", "failed_at": null, "paused": false, "pause_after_trial": false, "sync_frequency": 360, "data_delay_threshold": 0, "data_delay_sensitivity": "NORMAL", "schedule_type": "manual", "status": { "setup_state": "connected", "schema_status": "ready", "sync_state": "scheduled", "update_state": "on_schedule", "is_historical_sync": false, "tasks": [], "warnings": [] } } }
実行後、ダッシュボード上でコネクタを確認すると、同期頻度が API による制御に変更されています。
同期の実行
API で同期を実行する際は、以下のエンドポイントを使用します。
#手動実行 curl -X POST \ -H "Content-Type: application/json" \ -H "Authorization: Basic $AUTH_HEADER" \ -d '{"force": true}' \ https://api.fivetran.com/v1/connectors/{$CONNECTOR_ID}/sync
出力
{ "code": "Success", "message": "Sync has been successfully forced for connector with id '<コネクタID>'" }
ダッシュボードを確認すると同期が開始されていることを確認できます。
Webhook によるイベント通知の構成
Fivetran では Webhook によるイベント通知の構成が可能です。
ここでは以下の手順で検証を行いました。
構成
Amazon API Gateway と AWS Lambda を使用し、Amazon SES によるメール通知を行います。Lambda と Amazon SES 間の構成は以下の手順を参照しました。
API Gateway と Lambda 間は以下を参考に構成を行いました。
FIvetran 側:Webhook の構成
Webhook の管理については、以下に記載があります。
一部の抜粋ですが、以下のイベントをサポートしています。
- sync_start
- sync_end
- status (deprecated)
- dbt_run_start
- dbt_run_succeeded
- dbt_run_failed
- resync_connector
- resync_table
ここでは sync_start
, sync_end
を対象とした Webhook を作成します。
また Webhook は、アカウントレベルとグループ(Destination)レベルで作成可能です。本記事ではグループレベルの Webhook を作成します。
はじめにグループIDを取得します。
#group id の取得 curl -X GET https://api.fivetran.com/v1/groups \ -H "Content-Type: application/json" \ -H "Authorization: Basic $AUTH_HEADER"
出力の一部
{ "id": "<グループID", "name": "Snowflake_yasuhara_test", "created_at": "2023-10-06T07:06:59.289141Z" },
グループIDを指定し、以下で Webhook を作成します。URL には、API Gateway のエンドポイントを指定します。
curl -X POST https://api.fivetran.com/v1/webhooks/group/${GROUP_ID} \ -H "Content-Type: application/json" \ -H "Authorization: Basic $AUTH_HEADER" \ -d '{ "url": "'$URL'", "events": ["sync_start","sync_end"], "active": true, "secret": "<シークレット>" }'
出力
{ "code": "Success", "message": "Group webhook has been created", "data": { "id": "candied_neural", "type": "group", "group_id": "<グループID>", "url": "<API Gateway URL>", "events": [ "sync_start", "sync_end" ], "active": true, "secret": "******", "created_at": "2024-05-01T01:24:47.138Z", "created_by": "known_rheumatic" } }
上記より、Lambda 関数には以下を使用しました。
import json import boto3 from botocore.exceptions import ClientError def lambda_handler(event, context): # メール送信用のSESクライアントを作成 ses_client = boto3.client('ses', region_name='us-east-2') # Webhookからのデータを取得 webhook_data = json.loads(event['body']) # イベントごとに適切なメールの本文を生成 email_body = generate_email_body(webhook_data) # メールを送信 try: response = ses_client.send_email( Source='<送信元のメールアドレス>', # 送信元のメールアドレス Destination={'ToAddresses': ['<送信先のメールアドレス>']}, # 送信先のメールアドレス Message={ 'Subject': {'Data': 'Fivetran Webhook Notification'}, # メールの件名 'Body': {'Text': {'Data': email_body}} # メールの本文 } ) except ClientError as e: print(e.response['Error']['Message']) return { 'statusCode': 500, 'body': json.dumps('Failed to send email') } else: print("Email sent! Message ID:", response['MessageId']) return { 'statusCode': 200, 'body': json.dumps('Email sent successfully') } def generate_email_body(webhook_data): connector_name = webhook_data['connector_name'] # イベントごとに適切なメールの本文を生成 if webhook_data['event'] == 'sync_end': status = webhook_data['data']['status'] # statusに応じてメールの本文を変更 if status == 'SUCCESSFUL': body = f"The sync process for connector '{connector_name}' ended successfully." else: body = f"The sync process for connector '{connector_name}' failed." elif webhook_data['event'] == 'sync_start': body = f"The sync process for connector '{connector_name}' has started." else: body = "Unsupported event type." return body
Fivetran が送信する Webhook のペイロードの構造は以下に例があるので、こちらを参考に sync_start
, sync_end
のイベントごとに通知を行いsync_end
時はそのステータスに応じてメッセージ内容を変えています。
Webhook のテスト
作成した Webhook は以下のエンドポイントでテスト可能です。
同期の実行
さいごに API 経由で同期を実行し、Webhook 経由で通知が届くか確認します。
#同期の実行 curl -X POST \ -H "Content-Type: application/json" \ -H "Authorization: Basic $AUTH_HEADER" \ -d '{"force": true}' \ https://api.fivetran.com/v1/connectors/{$CONNECTOR_ID}/sync
コマンド実行後、送信先メールアドレスに以下の内容でメールが届きました。
その後すぐ、同期が完了したことを知らせるメールも届きました。
次に、S3 上にフォーマットが異なるファイルを配置し、同期を失敗させます。
この場合、下図の通りメールが届きました。
さいごに
API 経由で同期の実行を試してみました。任意のスケジュールで同期を実行したい場合はこちらが選択肢になり得るかと思います。
こちらの内容が何かの参考になれば幸いです。